home *** CD-ROM | disk | FTP | other *** search
/ InfoMagic Internet Tools 1993 July / Internet Tools.iso / RockRidge / mail / pp / pp-6.0 / Chans / splitter / splitter.c < prev    next >
Encoding:
C/C++ Source or Header  |  1991-12-18  |  9.0 KB  |  410 lines

  1. /* splitter.c: resubmit messages so as to have indivual processes for recips */
  2.  
  3. # ifndef lint
  4. static char Rcsid[] = "@(#)$Header: /xtel/pp/pp-beta/Chans/splitter/RCS/splitter.c,v 6.0 1991/12/18 20:12:36 jpo Rel $";
  5. # endif
  6.  
  7. /*
  8.  * $Header: /xtel/pp/pp-beta/Chans/splitter/RCS/splitter.c,v 6.0 1991/12/18 20:12:36 jpo Rel $
  9.  *
  10.  * $Log: splitter.c,v $
  11.  * Revision 6.0  1991/12/18  20:12:36  jpo
  12.  * Release 6.0
  13.  *
  14.  */
  15.  
  16.  
  17. #include "util.h"
  18. #include "qmgr.h"
  19. #include "q.h"
  20. #include "prm.h"
  21. #include "retcode.h"
  22. #include <sys/types.h>
  23. #include <sys/stat.h>
  24. #include "sys.file.h"
  25.  
  26. static void dirinit();
  27. static int initialise();
  28. static int endfunc();
  29. static struct type_Qmgr_DeliveryStatus *process();
  30. static ADDR *getnthrecip ();
  31. static int submit_error ();
  32. static void set_success();
  33. static int processAddr();
  34.  
  35. CHAN    *mychan;
  36. char    *this_msg = NULLCP, *this_chan = NULLCP;
  37. int    first_successDR, first_failureDR, start_submit;
  38. int    linked = TRUE;
  39.  
  40. main (argc, argv)
  41. int    argc;
  42. char    **argv;
  43. {
  44.     sys_init (argv[0]);
  45.     dirinit();
  46.  
  47. #ifdef PP_DEBUG
  48.     if (argc > 1 && strcmp (argv[1], "debug") == 0)
  49.         debug_channel_control (argc,argv,initialise,process,endfunc);
  50.     else
  51. #endif
  52.         channel_control (argc,argv,initialise,process,endfunc);
  53. }
  54.  
  55. /*   */
  56. /* move to correct place in file system */
  57.  
  58. extern char    *quedfldir;
  59.  
  60. static void dirinit ()
  61. {
  62.     if (chdir (quedfldir) < 0)
  63.         err_abrt (RP_LIO,
  64.               "Unable to change directory to '%s'",
  65.               quedfldir);
  66. }
  67.  
  68. /*   */
  69. /* channel initialisation routine */
  70.  
  71. static void parse_ch_info();
  72.  
  73. static int initialise (arg)
  74. struct type_Qmgr_Channel    *arg;
  75. {
  76.     char    *name;
  77.     name = qb2str (arg);
  78.  
  79.     if ((mychan = ch_nm2struct (name)) == NULLCHAN) {
  80.         PP_OPER (NULLCP, ("Channel '%s' not known", name));
  81.         if (name != NULLCP) free(name);
  82.         return NOTOK;
  83.     }
  84.  
  85.     if (name != NULLCP) free(name);
  86.     if (mychan->ch_out_info != NULLCP)
  87.         parse_ch_info (mychan->ch_out_info);
  88.  
  89.     start_submit = TRUE;
  90.     return OK;
  91. }
  92.  
  93. /* parse the info string and set the global variables */
  94. static void parse_ch_info(info)
  95. char    *info;
  96. {
  97.     char    *info_copy, *margv[20];
  98.     int    margc, ix;
  99.  
  100.     if (info == NULLCP) return;
  101.  
  102.     info_copy = strdup(info);
  103.  
  104.     if ((margc = sstr2arg (info_copy, 20, margv, ",")) > 0) {
  105.         for (ix = 0; ix < margc; ix++) {
  106.             /* check each entry in info string */
  107.             if (lexequ(margv[ix], "notlinked") == 0)
  108.                 linked = FALSE;
  109.             else if (lexequ(margv[ix], "linked") == 0)
  110.                 linked = TRUE;
  111.             else 
  112.                 PP_LOG(LLOG_EXCEPTIONS,
  113.                        ("Unknown ch_info flag '%s'",
  114.                     margv[ix]));
  115.         }
  116.     }
  117.     free(info_copy);
  118. }
  119.  
  120. /*   */
  121. /* channel termination routine */
  122.  
  123. static int endfunc (arg)
  124. struct type_Qmgr_Channel *arg;
  125. {
  126.     if (start_submit == FALSE)
  127.         io_end (OK);
  128.     start_submit = TRUE;
  129. }
  130.  
  131. /*   */
  132. /* channel work routine */
  133.  
  134. static int security_check (msg)
  135. struct type_Qmgr_ProcMsg *msg;
  136. {
  137.     if (this_msg) free(this_msg);
  138.     this_msg = qb2str (msg->qid);
  139.     
  140.     if (this_chan) free(this_chan);
  141.     this_chan = qb2str (msg->channel);
  142.     
  143.     if (mychan == NULLCHAN
  144.         || strcmp (this_chan, mychan->ch_name) != 0) {
  145.         PP_LOG (LLOG_EXCEPTIONS,
  146.             ("channel error: '%s'",
  147.              this_chan));
  148.         return FALSE;
  149.     }
  150.  
  151.     return TRUE;
  152. }
  153.  
  154. static struct type_Qmgr_DeliveryStatus *process (arg)
  155. struct type_Qmgr_ProcMsg *arg;
  156. {
  157.     struct prm_vars            prm;
  158.     Q_struct            que;
  159.     ADDR                *sender = NULLADDR;
  160.     ADDR                *recips = NULLADDR;
  161.     ADDR                *adr;
  162.     int                rcount, retval;
  163.     struct type_Qmgr_UserList    *ix;
  164.     RP_Buf                *reply;
  165.  
  166.     bzero ((char *)&prm, sizeof (prm));
  167.     bzero ((char *)&que, sizeof (que));
  168.  
  169.     delivery_init (arg->users);
  170.     delivery_setall (int_Qmgr_status_messageFailure);
  171.     first_failureDR = first_successDR = TRUE;
  172.  
  173.     if (security_check (arg) != TRUE)
  174.         return deliverystate;
  175.  
  176.     PP_LOG (LLOG_NOTICE,
  177.         ("processing msg '%s' through '%s'",
  178.          this_msg, this_chan));
  179.  
  180.     /* read in message */
  181.     if (rp_isbad (rd_msg (this_msg, &prm, &que,
  182.                   &sender, &recips, &rcount))) {
  183.         PP_LOG (LLOG_EXCEPTIONS,
  184.             ("rd_msg error: '%s'", this_msg));
  185.         rd_end();
  186.         return delivery_setallstate(int_Qmgr_status_messageFailure,
  187.                         "Can't read message");
  188.     }
  189.  
  190.     /* process recipients individually */
  191.     for (ix = arg->users; ix; ix = ix->next) {
  192.         if ((adr = getnthrecip (&que, ix->RecipientId->parm)) == NULLADDR) {
  193.             PP_LOG (LLOG_EXCEPTIONS,
  194.                 ("failed to find recipient %d of msg '%s'",
  195.                  ix->RecipientId->parm, this_msg));
  196.             delivery_setstate (ix->RecipientId->parm,
  197.                        int_Qmgr_status_messageFailure,
  198.                        "Unable to find specified recipient");
  199.             continue;
  200.         }
  201.  
  202.         if (start_submit == TRUE && rp_isbad (io_init (&reply))) {
  203.             submit_error (adr, "io_init", reply);
  204.             rd_end();
  205.             return delivery_setallstate (int_Qmgr_status_messageFailure,
  206.                              "Unable to start submit");
  207.         } else
  208.             start_submit = FALSE;
  209.         
  210.         switch (chan_acheck (adr, mychan, 1, (char **)NULL)) {
  211.             default:
  212.             case NOTOK:
  213.             break;
  214.             case OK:
  215.             processAddr (this_msg, &prm, &que, adr);
  216.             break;
  217.         }
  218.     }
  219.  
  220.     /* write results of processing */
  221.  
  222.     if (rp_isbad (retval = wr_q2dr (&que, this_msg))) {
  223.         PP_LOG (LLOG_EXCEPTIONS,
  224.             ("%s wr_q2dr failure '%d'",
  225.              mychan->ch_name, retval));
  226.         (void) delivery_resetDRs (int_Qmgr_status_messageFailure);
  227.     }
  228.  
  229.     rd_end ();
  230.     q_free(&que);
  231.     prm_free (&prm);
  232.     
  233.     return deliverystate;
  234. }
  235.  
  236. /*   */
  237. /* do processing for specified recip */
  238.  
  239. static int processAddr (msg, prm, qp, recip)
  240. char        *msg;
  241. struct prm_vars    *prm;
  242. Q_struct    *qp;
  243. ADDR        *recip;
  244. {
  245.     ADDR    *sender = adr_new((qp->Oaddress->ad_type == AD_X400_TYPE) ?
  246.                   qp->Oaddress->ad_r400adr :
  247.                   qp->Oaddress->ad_r822adr,
  248.                   qp->Oaddress->ad_type,
  249.                   0);
  250.  
  251.     ADDR    *recipient = adr_new((recip->ad_type == AD_X400_TYPE) ?
  252.                   recip->ad_r400adr :
  253.                   recip->ad_r822adr,
  254.                   recip->ad_type,
  255.                   1);
  256.     Q_struct    qs;
  257.     struct timeval data_time;
  258.     struct stat st;
  259.     RP_Buf    reply;
  260.     char    *msgdir = NULLCP,
  261.         file[MAXPATHLENGTH],
  262.         buf[BUFSIZ],
  263.         *strippedname;
  264.     int    dirlen,
  265.         size,
  266.         fd_in, n;
  267.  
  268.     if (qid2dir (msg, recip, TRUE, &msgdir) != OK) {
  269.         PP_LOG (LLOG_EXCEPTIONS,
  270.             ("msg dir not found for recip %d of msg '%s'",
  271.              recip->ad_no, msg));
  272.         delivery_setstate (recip->ad_no,
  273.                    int_Qmgr_status_messageFailure,
  274.                    "source directory not found");
  275.         return 0;
  276.     }
  277.     
  278.     q_init (&qs);
  279.     q_almost_dup (&qs, qp);
  280.     qs.encodedinfo.eit_types = list_bpt_dup(qp->encodedinfo.eit_types);
  281.     
  282.     qs.inbound = list_rchan_new (qp->inbound->li_mta,
  283.                      qp->inbound->li_chan->ch_name);
  284.     prm->prm_opts = prm->prm_opts | PRM_NOTRACE | PRM_ACCEPTALL;
  285.  
  286.     /* now resubmit */
  287.     timer_start(&data_time);
  288.  
  289.     if (rp_isbad (io_wprm (prm, &reply)))
  290.         return submit_error (recip, "io_wprm", &reply);
  291.  
  292.     if (rp_isbad (io_wrq (&qs, &reply)))
  293.         return submit_error (recip, "io_wrq", &reply);
  294.  
  295.     if (rp_isbad (io_wadr (sender, AD_ORIGINATOR, &reply))) {
  296.         (void) sprintf (buf, "io_wadr(%s)", sender->ad_value);
  297.         return submit_error(recip, buf, &reply);
  298.     }
  299.  
  300.     if (rp_isbad (io_wadr (recipient, AD_RECIPIENT, &reply))) {
  301.         (void) sprintf (buf, "io_wadr(%s)", recipient->ad_value);
  302.         return submit_error(recip, buf, &reply);
  303.     }
  304.  
  305.     if (rp_isbad (io_adend (&reply)))
  306.         return submit_error (recip, "io_adend", &reply);
  307.  
  308.     /* submit body of message */
  309.  
  310.     if (rp_isbad (io_tinit (&reply)))
  311.         return submit_error (recip, "io_tinit", &reply);
  312.     dirlen = strlen (msgdir) +1;
  313.  
  314.     msg_rinit (msgdir);
  315.  
  316.     size = 0;
  317.     while (msg_rfile (file) != RP_DONE) {
  318.  
  319.         /* --- transmit file --- */
  320.         strippedname = file + dirlen;
  321.         if (stat (file, &st) != NOTOK)
  322.             size += st.st_size;
  323.         if (linked == TRUE) {
  324.             (void) sprintf(buf, "%s %s",strippedname, file);
  325.             if (rp_isbad (io_tpart (buf, TRUE, &reply))) 
  326.                 return submit_error (recip,"io_tpart",&reply);
  327.         } else {
  328.             if (rp_isbad (io_tpart (strippedname, FALSE, &reply))) 
  329.                 return submit_error (recip,"io_tpart",&reply);
  330.  
  331.             if ((fd_in = open (file, O_RDONLY)) == -1) {
  332.                 (void) strcpy (reply.rp_line,file);
  333.                 return submit_error (recip,"open",&reply);
  334.             }
  335.             while ((n = read (fd_in, buf, BUFSIZ)) > 0) {
  336.                 if (rp_isbad (io_tdata (buf, n))) {
  337.                     (void) strcpy (reply.rp_line,"???");
  338.                     return submit_error (recip,"io_tdata",&reply);
  339.                 }
  340.             }
  341.  
  342.             close (fd_in);
  343.             if (rp_isbad (io_tdend (&reply)))
  344.                 return submit_error (recip,"io_tdend", &reply);
  345.         }
  346.     }
  347.     msg_rend();
  348.     if (rp_isbad (io_tend (&reply)))
  349.         return submit_error (recip,"io_tend", &reply);
  350.     set_success (recip,qp, size);
  351.  
  352.     q_free (&qs);
  353.  
  354.     timer_end (&data_time, size, "Data submitted");
  355.  
  356.     return 0;
  357. }
  358.  
  359. /*   */
  360.  
  361. static void set_success (recip, que, size)
  362. ADDR        *recip;
  363. Q_struct    *que;
  364. int        size;
  365. {
  366.     (void) wr_ad_status (recip, AD_STAT_DONE);
  367.     (void) wr_stat (recip, que, this_msg, size);
  368.     delivery_set (recip->ad_no, int_Qmgr_status_success);
  369. }
  370.  
  371. static ADDR *getnthrecip(que, num)
  372. Q_struct    *que;
  373. int        num;
  374. {
  375.     ADDR *ix = que->Raddress;
  376.  
  377.     if (num == 0)
  378.         return que->Oaddress;
  379.     while ((ix != NULL) && (ix->ad_no != num))
  380.         ix = ix->ad_next;
  381.     return ix;
  382. }
  383.  
  384. static int submit_error (recip, proc, reply)
  385. ADDR    *recip;
  386. char    *proc;
  387. RP_Buf    *reply;
  388. {
  389.     char    buf[BUFSIZ];
  390.     PP_LOG (LLOG_EXCEPTIONS,
  391.            ("Chans/list %s failure [%s]", proc, reply->rp_line));
  392.  
  393.     if (recip != NULLADDR) {
  394.         (void) sprintf (buf,
  395.                 "'%s' failure for '%s' [%s]",
  396.                 proc,
  397.                 this_msg,
  398.                 reply -> rp_line);
  399.         PP_OPER(NULLCP,("%s", buf));
  400.         delivery_setstate (recip->ad_no, 
  401.                    int_Qmgr_status_messageFailure,
  402.                    buf);
  403.     }
  404.  
  405.     start_submit = TRUE;
  406.     io_end (NOTOK);
  407.  
  408.     return OK;
  409. }
  410.